1 module bloomberg_dl.bloomberg_dl; 2 import libssh.session; 3 import libssh.sftp; 4 import libssh.errors; 5 import std.array; 6 import std.conv; 7 import std.datetime.systime : SysTime, Clock; 8 import std.datetime : UTC; 9 import std.zlib; 10 import std.algorithm; 11 import std.file; 12 import std.string : toStringz, fromStringz; 13 version(Windows) 14 { 15 import core.stdc.stdio; 16 } 17 else 18 { 19 import core.sys.posix.fcntl; 20 } 21 22 23 extern (C++) 24 void releaseTalbe(const (char)*** table); 25 26 extern (C++) 27 const (char)*** soapGetData 28 ( 29 const (char)* host, 30 const (char)* cert, 31 const (char)* pass, 32 const (char)** fields, 33 const (char)** ident, 34 bool withHeader, 35 long interval, 36 long retry 37 ); 38 39 extern (C++) 40 const (char)*** soapGetHistorical 41 ( 42 const (char)* host, 43 const (char)* cert, 44 const (char)* pass, 45 const (char)** fields, 46 const (char)** ident, 47 const (char)* source, 48 long startDate, 49 long endDate, 50 bool withHeader, 51 long interval, 52 long retry 53 ); 54 55 56 const (char)** toStrings(ref string[] strs) 57 { 58 const (char)* [] lines; 59 const (char)** result; 60 lines.length = strs.length + 1; 61 for (ulong i = 0; i < strs.length; ++i) 62 { 63 lines[i] = toStringz(strs[i]); 64 } 65 lines[strs.length] = null; 66 result = lines.ptr; 67 return result; 68 } 69 70 string[][] converTalbe(const (char)*** table) 71 { 72 import std.conv; 73 string[][] result; 74 for (ulong i = 0; table[i] != null; ++i) 75 { 76 string[] line; 77 for (ulong j = 0; table[i][j] != null; ++j) 78 { 79 line ~= to!string(fromStringz(table[i][j])); 80 } 81 result ~= line; 82 } 83 releaseTalbe(table); 84 return result; 85 } 86 87 string[][] soapGetData(string host, string cert, string pass, string[] fields, string[] idents, bool withHeader, long interval, long retry) 88 { 89 return converTalbe(soapGetData(toStringz(host), toStringz(cert), toStringz(pass), toStrings(fields), toStrings(idents), withHeader, interval, retry)); 90 } 91 92 string[][] soapGetHistorical(string host, string cert, string pass, string[] fields, string[] idents, string source, long startDate, long endDate, bool withHeader, long interval, long retry) 93 { 94 return converTalbe(soapGetHistorical(toStringz(host), toStringz(cert), toStringz(pass), toStrings(fields), toStrings(idents), toStringz(source), startDate, endDate, withHeader, interval, retry)); 95 } 96 97 class bloomberg_dl 98 { 99 private: 100 static immutable string fileTemplate = 101 "START-OF-FILE 102 FIRMNAME=USER_XXX 103 COMPRESS=yes 104 FILETYPE=pc 105 REPLYFILENAME=REPLY_XXX 106 RANG_XXX 107 PROGRAMNAME=PROGRAM_XXX 108 109 START-OF-FIELDSFILED_XXX 110 END-OF-FIELDS 111 112 START-OF-DATADATA_XXX 113 END-OF-DATA 114 END-OF-FILE"; 115 version(Windows) 116 { 117 static immutable string new_line = "\n"; 118 static immutable string start_split = "START-OF-DATA\n"; 119 static immutable string end_split = "\nEND-OF-DATA"; 120 } 121 else 122 { 123 static immutable string new_line = "\r\n"; 124 static immutable string start_split = "START-OF-DATA\r\n"; 125 static immutable string end_split = "\r\nEND-OF-DATA"; 126 } 127 private: 128 static string getFileName(string prefix) 129 { 130 string name = "api_"; 131 name ~= prefix; 132 SysTime currentTime = Clock.currTime(UTC()); 133 long num = currentTime.fracSecs.total!"nsecs" / 1000000L; 134 num += currentTime.second * 1000L; 135 num += currentTime.minute * 100000L; 136 num += currentTime.hour * 10000000L; 137 num += currentTime.day * 1000000000L; 138 num += to!long(currentTime.month) * 100000000000L; 139 num += currentTime.year * 10000000000000L; 140 name ~= to!string(num); 141 return name; 142 } 143 static SSHSession getSession(string host, string user, string pass) 144 { 145 auto session = new SSHSession(); 146 session.host = host; 147 session.user = user; 148 session.logVerbosity = LogVerbosity.NoLog; 149 session.connect(); 150 auto rc = session.userauthPassword(user, pass); 151 if (AuthState.Success != rc) 152 return null; 153 return session; 154 } 155 static string[] list(SFTPSession session) 156 { 157 SFTPAttributes attr; 158 string[] result; 159 auto dir = session.openDir("/"); 160 while (!dir.eof) 161 { 162 try 163 { 164 dir.readdir(attr); 165 result ~= attr.name; 166 } 167 catch(SSHException) 168 { 169 break; 170 } 171 } 172 dir.close(); 173 return result; 174 } 175 static bool wait_file(SFTPSession session, string file_name, long interval = 10, long retry = 100) 176 { 177 for (int i = 0; i < retry; ++i) 178 { 179 auto files = list(session); 180 foreach (string file; files) 181 { 182 if (file == file_name) 183 { 184 return true; 185 } 186 } 187 import core.thread; 188 Thread.sleep( dur!("seconds")(interval)); 189 } 190 return false; 191 } 192 static string getDataTemplateStringBase(string user, string[] fields, string[] idents) 193 { 194 string tmpStr = fileTemplate; 195 string strFields = ""; 196 string strDatas = ""; 197 foreach (string field; fields) 198 { 199 strFields ~= "\n"; 200 strFields ~= field; 201 } 202 foreach (string ident; idents) 203 { 204 strDatas ~= "\n"; 205 strDatas ~= ident; 206 } 207 tmpStr = replace(tmpStr, "USER_XXX", user); 208 tmpStr = replace(tmpStr, "FILED_XXX", strFields); 209 tmpStr = replace(tmpStr, "DATA_XXX", strDatas); 210 return tmpStr; 211 } 212 static string getDataTemplateString(string user, string[] fields, string[] idents) 213 { 214 auto result = getDataTemplateStringBase(user, fields, idents); 215 auto range = "SECMASTER=yes"; 216 result = replace(result, "RANG_XXX", range); 217 result = replace(result, "PROGRAM_XXX", "getdata"); 218 return result; 219 } 220 static string getHistoricalTemplateString(string user, string[] fields, string[] idents, string source, long startDate, long endDate) 221 { 222 auto result = getDataTemplateStringBase(user, fields, idents); 223 string range = "PRICING_SOURCE="; 224 range ~= source; 225 range ~= "\nDATERANGE="; 226 range ~= to!string(startDate); 227 range ~= "|"; 228 range ~= to!string(endDate); 229 range ~= "\nHIST_FORMAT=horizontal\n"; 230 result = replace(result, "RANG_XXX", range); 231 result = replace(result, "PROGRAM_XXX", "gethistory"); 232 return result; 233 } 234 static void write_file(SFTPSession session, string path, string content) 235 { 236 auto tmp_path = path; 237 tmp_path ~= ".tmp"; 238 int access_type = O_WRONLY | O_CREAT | O_TRUNC; 239 int mode = S_IRWXU; 240 auto file = session.open(tmp_path, access_type, mode); 241 ubyte[] buffer = cast(ubyte[])content; 242 file.write(buffer); 243 file.close(); 244 session.rename(tmp_path, path); 245 } 246 static string read_file(SFTPSession session, string path) 247 { 248 int access_type = O_RDONLY; 249 int mode = 0; 250 SFTPFile file = null; 251 try 252 { 253 file = session.open(path, access_type, mode); 254 } 255 catch(SFTPException) 256 { 257 return ""; 258 } 259 ubyte[] all_buff; 260 while (true) 261 { 262 ubyte[] rd_buff; 263 rd_buff.length = 1024; 264 try 265 { 266 auto cnt = file.read(rd_buff); 267 if (cnt < 0) 268 { 269 return ""; 270 } 271 else if (cnt == 0) 272 { 273 break; 274 } 275 rd_buff.length = cnt; 276 all_buff ~= rd_buff; 277 } 278 catch(SFTPException) 279 { 280 return ""; 281 } 282 } 283 file.close(); 284 void[] result_buffer; 285 auto uc = new UnCompress(HeaderFormat.gzip); 286 result_buffer ~= uc.uncompress(all_buff); 287 result_buffer ~= uc.flush(); 288 return cast(string)result_buffer; 289 } 290 static string[][] decode_file(SFTPSession session, string path) 291 { 292 string[][] result; 293 auto str = read_file(session, path); 294 if (str == "") 295 { 296 return result; 297 } 298 str = str.findSplit(start_split)[2]; 299 str = str.findSplit(end_split)[0]; 300 auto lines = str.split(new_line); 301 foreach (string line; lines) 302 { 303 auto columns = line.split("|"); 304 string[] line_array; 305 for (long i = 0; i < columns.length; ++i) 306 { 307 if ((i < 1 || i > 2) && i != columns.length - 1) 308 { 309 line_array ~= columns[i]; 310 } 311 } 312 result ~= line_array; 313 } 314 return result; 315 316 } 317 static string convertCsv(string[][] data) 318 { 319 string result; 320 for (long i = 0; i < data.length; ++i) 321 { 322 string line = ""; 323 auto column = data[i]; 324 for (long j = 0; j < column.length; ++j) 325 { 326 line ~= column[j]; 327 if (j != column.length - 1) 328 { 329 line ~= ","; 330 } 331 } 332 result ~= line; 333 result ~= "\n"; 334 } 335 return result; 336 } 337 static void saveData(string str, string path, int level) 338 { 339 if (level < 0 || level > 9) 340 level = 0; 341 if (0 == level) 342 { 343 std.file.write(path, str); 344 } 345 else 346 { 347 auto cmp = new Compress(level, HeaderFormat.gzip); 348 auto result_buffer = cmp.compress(cast(void[])str); 349 result_buffer ~= cmp.flush(); 350 path ~= ".gz"; 351 std.file.write(path, result_buffer); 352 } 353 } 354 public: 355 static string[][] getData(string host, string user, string pass, string[] fields, string[] idents, bool withHeader = true, long interval = 10, long retry = 100) 356 { 357 import std.string; 358 if (host == "" || 0 == indexOf(host, "http")) 359 { 360 return soapGetData(host, user, pass, fields, idents, withHeader, interval, retry); 361 } 362 string[][] result; 363 if (withHeader) 364 { 365 string[] header = ["BbgID"]; 366 header ~= fields; 367 result ~= header; 368 } 369 auto file_name = getFileName("D"); 370 auto req_file_name = "/"; 371 auto res_file_name = "/"; 372 auto wait_name = file_name; 373 wait_name ~= ".gz"; 374 req_file_name ~= file_name; 375 req_file_name ~= ".req"; 376 res_file_name ~= file_name; 377 res_file_name ~= ".gz"; 378 auto sshSession = getSession(host, user, pass); 379 if (sshSession is null) 380 return result; 381 auto session = sshSession.newSFTP(); 382 auto upload_str = getDataTemplateString(user, fields, idents); 383 upload_str = replace(upload_str, "REPLY_XXX", file_name); 384 write_file(session, req_file_name, upload_str); 385 if (!wait_file(session, wait_name, interval, retry)) 386 return result; 387 result ~= decode_file(session, res_file_name); 388 session.dispose(); 389 sshSession.dispose(); 390 return result; 391 392 } 393 static string[][] getHistorical(string host, string user, string pass, string[] fields, string[] idents, string source, long startDate, long endDate, bool withHeader = true, long interval = 10, long retry = 100) 394 { 395 import std.string; 396 if (host == "" || 0 == indexOf(host, "http")) 397 { 398 return soapGetHistorical(host, user, pass, fields, idents, source, startDate, endDate, withHeader, interval, retry); 399 } 400 string[][] result; 401 if (withHeader) 402 { 403 string[] header = ["BbgID", "Date"]; 404 header ~= fields; 405 result ~= header; 406 } 407 auto file_name = getFileName("H"); 408 auto req_file_name = "/"; 409 auto res_file_name = "/"; 410 auto wait_name = file_name; 411 wait_name ~= ".gz"; 412 req_file_name ~= file_name; 413 req_file_name ~= ".req"; 414 res_file_name ~= file_name; 415 res_file_name ~= ".gz"; 416 auto sshSession = getSession(host, user, pass); 417 if (sshSession is null) 418 return result; 419 auto session = sshSession.newSFTP(); 420 auto upload_str = getHistoricalTemplateString(user, fields, idents, source, startDate, endDate); 421 upload_str = replace(upload_str, "REPLY_XXX", file_name); 422 write_file(session, req_file_name, upload_str); 423 if (!wait_file(session, wait_name, interval, retry)) 424 return result; 425 result ~= decode_file(session, res_file_name); 426 session.dispose(); 427 sshSession.dispose(); 428 return result; 429 } 430 static string getDataCsv(string host, string user, string pass, string[] fields, string[] idents, bool withHeader = true, long interval = 10, long retry = 100) 431 { 432 return convertCsv(getData(host, user, pass, fields, idents, withHeader, interval, retry)); 433 } 434 static string getHistoricalCsv(string host, string user, string pass, string[] fields, string[] idents, string source, long startDate, long endDate, bool withHeader = true, long interval = 10, long retry = 100) 435 { 436 return convertCsv(getHistorical(host, user, pass, fields, idents, source, startDate, endDate, withHeader, interval, retry)); 437 } 438 static void downloadData(string host, string user, string pass, string[] fields, string[] idents, string path, int level = 0, bool withHeader = true, long interval = 10, long retry = 100) 439 { 440 saveData(convertCsv(getData(host, user, pass, fields, idents, withHeader, interval, retry)), path, level); 441 } 442 static void downloadHistorical(string host, string user, string pass, string[] fields, string[] idents, string source, long startDate, long endDate, string path, int level = 0, bool withHeader = true, long interval = 10, long retry = 100) 443 { 444 saveData(convertCsv(getHistorical(host, user, pass, fields, idents, source, startDate, endDate, withHeader, interval, retry)), path, level); 445 } 446 };